{
 "cells": [
  {
   "cell_type": "markdown",
   "id": "7c76c6a57297b63a",
   "metadata": {},
   "source": [
    "# Convert from GPS and Euler angles in EXIF to camera intrinsics and extrinsics"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "2a7807bab6265cdd",
   "metadata": {},
   "outputs": [],
   "source": [
    "%load_ext autoreload\n",
    "%autoreload 2"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "initial_id",
   "metadata": {},
   "outputs": [],
   "source": [
    "import os\n",
    "from glob import glob\n",
    "import numpy as np\n",
    "import PIL.Image\n",
    "import PIL.ExifTags\n",
    "import xml.etree.ElementTree as ET\n",
    "import pymap3d\n",
    "from tqdm.auto import tqdm"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "eb7217aee6489a7a",
   "metadata": {},
   "outputs": [],
   "source": [
    "image_dir = os.path.expanduser(\"~/data/image_set/dbl/AerialPhotography\")\n",
    "output_path = os.path.expanduser(\"~/data/image_set/dbl/parsed_from_exif\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e4bf8a3ddde911e7",
   "metadata": {},
   "outputs": [],
   "source": [
    "image_name_list = [i[len(image_dir):].lstrip(\"/\\\\\") for i in\n",
    "                   glob(os.path.join(image_dir, \"**/*.[jJ][pP][gG]\"), recursive=True)]\n",
    "image_name_list.sort()\n",
    "image_name_list"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "ae3b13b0f0e711e8",
   "metadata": {},
   "source": [
    "# 1. Extract EXIF data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "9a339ea45e702f5c",
   "metadata": {},
   "outputs": [],
   "source": [
    "def extract_exif_values(im):\n",
    "    raw_exif_data = im._getexif()\n",
    "    friendly_exif_data = {\n",
    "        PIL.ExifTags.TAGS[k]: v\n",
    "        for k, v in im._getexif().items()\n",
    "        if k in PIL.ExifTags.TAGS\n",
    "    }\n",
    "\n",
    "    return friendly_exif_data, raw_exif_data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ec257ac18a913d93",
   "metadata": {},
   "outputs": [],
   "source": [
    "# https://exiftool.org/TagNames/DJI.html\n",
    "def extract_xmp_values(im):\n",
    "    xmp = {}\n",
    "\n",
    "    for segment, content in im.applist:\n",
    "        # fine xmp content\n",
    "        marker, body = content.split(b'\\x00', 1)\n",
    "        if segment == 'APP1' and marker == b'http://ns.adobe.com/xap/1.0/':\n",
    "            # convert to string\n",
    "            str_body = body.decode()\n",
    "            tree = ET.fromstring(str_body)\n",
    "            for child in tree.iter():\n",
    "                for attr in child.attrib:\n",
    "                    key = attr.rsplit(\"}\", maxsplit=1)\n",
    "                    if len(key) == 2:\n",
    "                        key = key[1]\n",
    "                    else:\n",
    "                        key = attr\n",
    "                    xmp[key] = child.attrib[attr]\n",
    "    return xmp"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e6a0a75631cd6b4a",
   "metadata": {},
   "outputs": [],
   "source": [
    "def extract(image_path: str):\n",
    "    with PIL.Image.open(image_path) as im:\n",
    "        exif, _ = extract_exif_values(im)\n",
    "        xmp = extract_xmp_values(im)\n",
    "        size = im.size\n",
    "    return exif, xmp, size"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b03b07ea137f219e",
   "metadata": {},
   "outputs": [],
   "source": [
    "exif_list = []\n",
    "xmp_list = []\n",
    "image_width_list = []\n",
    "image_height_list = []\n",
    "\n",
    "for i in tqdm(image_name_list):\n",
    "    exif, xmp, size = extract(os.path.join(image_dir, i))\n",
    "    exif_list.append(exif)\n",
    "    xmp_list.append(xmp)\n",
    "    image_width_list.append(size[0])\n",
    "    image_height_list.append(size[1])"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "cdbc9b740c4fad4d",
   "metadata": {},
   "source": [
    "# 2. Parse EXIF"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7b9a095aa8c1c64a",
   "metadata": {},
   "outputs": [],
   "source": [
    "def gps_tuple_to_angle(i) -> float:\n",
    "    return (((float(i[2]) / 60.) + float(i[1])) / 60.) + float(i[0])\n",
    "\n",
    "\n",
    "def parse_GPSInfo(info: dict):\n",
    "    decoded = {}\n",
    "    for key in info.keys():\n",
    "        decode = PIL.ExifTags.GPSTAGS.get(key, key)\n",
    "        decoded[decode] = info[key]\n",
    "\n",
    "    simplified = {\n",
    "        \"LatitudeRef\": decoded[\"GPSLatitudeRef\"],\n",
    "        \"LongitudeRef\": decoded[\"GPSLongitudeRef\"],\n",
    "        \"AltitudeRef\": int.from_bytes(decoded[\"GPSAltitudeRef\"], \"little\"),\n",
    "        \"Latitude\": gps_tuple_to_angle(decoded[\"GPSLatitude\"]),\n",
    "        \"Longitude\": gps_tuple_to_angle(decoded[\"GPSLongitude\"]),\n",
    "        \"Altitude\": float(decoded[\"GPSAltitude\"]),\n",
    "    }\n",
    "    return simplified\n",
    "\n",
    "\n",
    "def get_gps_data(exif, xmp):\n",
    "    if \"Lat\" in xmp:\n",
    "        return float(xmp[\"Lat\"]), float(xmp[\"Lon\"]), float(xmp[\"AbsAlt\"]), float(xmp[\"RltAlt\"])\n",
    "    gps_data = parse_GPSInfo(exif[\"GPSInfo\"])\n",
    "    assert gps_data[\"LatitudeRef\"] == \"N\", gps_data[\"LatitudeRef\"]\n",
    "    assert gps_data[\"LongitudeRef\"] == \"E\", gps_data[\"LongitudeRef\"]\n",
    "    return gps_data[\"Latitude\"], gps_data[\"Longitude\"], xmp[\"AbsoluteAltitude\"], xmp[\"RelativeAltitude\"]\n",
    "\n",
    "\n",
    "def get_euler_angle_data(xmp):\n",
    "    if \"Yaw\" in xmp:\n",
    "        return float(xmp[\"Yaw\"]), float(xmp[\"Pitch\"]), float(xmp[\"Roll\"])\n",
    "\n",
    "    return float(xmp[\"GimbalYawDegree\"]), float(xmp[\"GimbalPitchDegree\"]), float(xmp[\"GimbalRollDegree\"])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "485f59a47a870dfd",
   "metadata": {},
   "outputs": [],
   "source": [
    "camera_model_to_camera_id = {}\n",
    "camera_id_to_camera_model = {}\n",
    "valid_image_list = []\n",
    "camera_width_list = []\n",
    "camera_height_list = []\n",
    "camera_focal_length_list = []\n",
    "camera_has_prior_focal_length = []\n",
    "gps_list = []\n",
    "euler_angle_list = []\n",
    "image_camera_id_list = []\n",
    "\n",
    "camera_model_to_sensor_size = {\n",
    "    \"PSDK102S_V2137H\": 23.5,\n",
    "    \"PSDK102S_V2137Q\": 23.5,\n",
    "    \"PSDK102S_V2137X\": 23.5,\n",
    "    \"PSDK102S_V2137Y\": 23.5,\n",
    "    \"PSDK102S_V2137Z\": 23.5,\n",
    "    \"ZENMUSE Z30\": 5.14,  # not known accurately\n",
    "}  # REMEMBER TO UPDATE IT\n",
    "\n",
    "for image_idx in tqdm(range(len(image_name_list))):\n",
    "    exif = exif_list[image_idx]\n",
    "    xmp = xmp_list[image_idx]\n",
    "\n",
    "    try:\n",
    "        gps_data = get_gps_data(exif, xmp)\n",
    "        skip = True\n",
    "        for i in gps_data:\n",
    "            if i != 0:\n",
    "                skip = False\n",
    "                break\n",
    "        if skip:\n",
    "            print(\"Skip {}\".format(image_name_list[image_idx]))\n",
    "            continue\n",
    "    except AssertionError:\n",
    "        print(\"Skip {} for invalid GPSInfo\".format(image_name_list[image_idx]))\n",
    "        continue\n",
    "\n",
    "    camera_model = exif[\"Model\"]\n",
    "\n",
    "    camera_key = \"{}-{}-{}-{}\".format(\n",
    "        camera_model,\n",
    "        exif[\"FocalLength\"],\n",
    "        image_width_list[image_idx],\n",
    "        image_height_list[image_idx],\n",
    "    )\n",
    "    camera_id = camera_model_to_camera_id.get(camera_key, None)\n",
    "    if camera_id is None:\n",
    "        # if `FocalLengthIn35mmFilm` presents in EXIF, use it to calculate the focal length,\n",
    "        # or calculate from sensor size\n",
    "        focal_length_in_35mm = float(exif.get(\"FocalLengthIn35mmFilm\", -1))\n",
    "        if focal_length_in_35mm > 0:\n",
    "            focal_length = focal_length_in_35mm * image_width_list[image_idx] / 36  # 36x24mm\n",
    "            camera_has_prior_focal_length.append(1)\n",
    "        else:\n",
    "            sensor_size = camera_model_to_sensor_size[camera_model]\n",
    "            if sensor_size > 0:\n",
    "                # known sensor size\n",
    "                focal_length = float(exif[\"FocalLength\"]) * image_width_list[image_idx] / sensor_size\n",
    "                camera_has_prior_focal_length.append(1)\n",
    "            else:\n",
    "                # unknown\n",
    "                focal_length = 1.25 * max(image_width_list[image_idx], image_height_list[image_idx])\n",
    "                camera_has_prior_focal_length.append(0)\n",
    "\n",
    "        camera_id = len(camera_width_list)\n",
    "        camera_width_list.append(image_width_list[image_idx])\n",
    "        camera_height_list.append(image_height_list[image_idx])\n",
    "        camera_focal_length_list.append(focal_length)\n",
    "        camera_model_to_camera_id[camera_key] = camera_id\n",
    "        camera_id_to_camera_model[camera_id] = camera_model\n",
    "\n",
    "    image_camera_id_list.append(camera_id)\n",
    "\n",
    "    gps_list.append(gps_data)\n",
    "    euler_angle_list.append(get_euler_angle_data(xmp))  # Z-Y-X\n",
    "\n",
    "    valid_image_list.append(image_name_list[image_idx])\n",
    "\n",
    "len(valid_image_list)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "d8d34d3297430721",
   "metadata": {},
   "source": [
    "# 3. Convert to poses"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "193cf631a591cce6",
   "metadata": {},
   "outputs": [],
   "source": [
    "gps_array = np.array(gps_list, dtype=np.float64)\n",
    "euler_angle_array = np.radians(np.array(euler_angle_list, dtype=np.float64))  # [N, Z-Y-X]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "490c6f074304ba02",
   "metadata": {},
   "outputs": [],
   "source": [
    "mid_center = (np.min(gps_array, axis=0) + np.max(gps_array, axis=0)) * 0.5\n",
    "mid_center"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "decf3dd364bcaa1",
   "metadata": {},
   "outputs": [],
   "source": [
    "xyz_in_ned = np.stack(pymap3d.geodetic2ned(\n",
    "    gps_array[:, 0], gps_array[:, 1], gps_array[:, 2],\n",
    "    mid_center[0], mid_center[1], mid_center[2],\n",
    "), axis=1)\n",
    "xyz_in_ned"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "939dd7bd1921e361",
   "metadata": {},
   "outputs": [],
   "source": [
    "import viser.transforms as vt"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "c7146456f0677fe1",
   "metadata": {},
   "outputs": [],
   "source": [
    "z_R = vt.SO3.from_z_radians(euler_angle_array[:, 0]).as_matrix()\n",
    "y_R = vt.SO3.from_y_radians(euler_angle_array[:, 1]).as_matrix()\n",
    "x_R = vt.SO3.from_x_radians(euler_angle_array[:, 2]).as_matrix()\n",
    "R = z_R @ y_R @ x_R  # in Z-Y-X order\n",
    "\n",
    "# An extra rotation for the view direction of the camera (look at Z+(Down) to X+ (North) of the world).\n",
    "# Different cameras has different extra rotations.\n",
    "y_extra_R = vt.SO3.from_y_radians(np.pi / 2).as_matrix()\n",
    "counter_clock_wise_extra_R = y_extra_R @ vt.SO3.from_z_radians(np.pi / 2).as_matrix()\n",
    "clockwise_extra_R = y_extra_R @ vt.SO3.from_z_radians(-np.pi / 2).as_matrix()\n",
    "\n",
    "extra_R_list = []\n",
    "for image_camera_id in image_camera_id_list:\n",
    "    image_camera_model = camera_id_to_camera_model[image_camera_id]\n",
    "    if image_camera_model.startswith(\"PSDK102S_V2137\") and not image_camera_model.endswith(\"X\"):\n",
    "        extra_R_list.append(clockwise_extra_R)\n",
    "    else:\n",
    "        extra_R_list.append(counter_clock_wise_extra_R)\n",
    "        \n",
    "R = R @ np.stack(extra_R_list)\n",
    "\n",
    "R.shape"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "325576e9f564175c",
   "metadata": {},
   "outputs": [],
   "source": [
    "c2w = np.concatenate(\n",
    "    [\n",
    "        np.concatenate([R, xyz_in_ned[..., None]], axis=-1),\n",
    "        np.repeat(np.asarray([0., 0., 0., 1.], dtype=R.dtype)[None, None, :], R.shape[0], axis=0),\n",
    "    ],\n",
    "    axis=1,\n",
    ")\n",
    "# make sure the concatenating is correct\n",
    "assert (c2w[0, :3, :3] == R[0]).all()\n",
    "assert (c2w[0, :3, 3] == xyz_in_ned[0]).all()\n",
    "assert (c2w[:, -1, -1] == 1.).all()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d92de1b4877f783d",
   "metadata": {},
   "outputs": [],
   "source": [
    "# flip the whole world upside down (NED to ENU: switch the X and Y, flip the Z)\n",
    "ned2enu = np.asarray([\n",
    "    [0., 1., 0., 0., ],\n",
    "    [1., 0., 0., 0., ],\n",
    "    [0., 0., -1., 0.],\n",
    "    [0., 0., 0., 1.],\n",
    "], dtype=c2w.dtype)\n",
    "c2w_in_enu = ned2enu @ c2w\n",
    "assert (c2w_in_enu[0, 0] == c2w[0, 1]).all()\n",
    "assert (c2w_in_enu[0, 1] == c2w[0, 0]).all()\n",
    "assert (c2w_in_enu[0, 2] == -c2w[0, 2]).all()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "b9e04f64b3a81dcd",
   "metadata": {},
   "outputs": [],
   "source": [
    "xyz_in_enu = c2w_in_enu[:, :3, 3]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7308681dc3c97386",
   "metadata": {},
   "outputs": [],
   "source": [
    "# store as npy\n",
    "os.makedirs(output_path, exist_ok=True)\n",
    "np.save(\n",
    "    os.path.join(output_path, \"parsed.npy\"),\n",
    "    {\n",
    "        \"image_name_list\": valid_image_list,\n",
    "        \"gps\": gps_array,\n",
    "        \"gps_origin\": mid_center,\n",
    "        \"euler_angle\": euler_angle_array,\n",
    "        \"camera\": [\n",
    "            np.asarray(camera_width_list, dtype=np.int32),\n",
    "            np.asarray(camera_height_list, dtype=np.int32),\n",
    "            np.asarray(camera_focal_length_list, dtype=np.float64),\n",
    "            np.asarray(camera_has_prior_focal_length, dtype=np.uint8),\n",
    "        ],\n",
    "        \"c2w\": c2w_in_enu,\n",
    "        \"image_camera_id_list\": image_camera_id_list,\n",
    "    },\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "fecdcc2c193fc528",
   "metadata": {},
   "source": [
    "# 4. Store as a colmap model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "7db4d93d4c53aeba",
   "metadata": {},
   "outputs": [],
   "source": [
    "from internal.utils import colmap"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "bdd88fb8c5c6f72f",
   "metadata": {},
   "outputs": [],
   "source": [
    "colmap_output_path = output_path\n",
    "colmap_sparse_model_output_path = os.path.join(colmap_output_path, \"sparse_from_exif\")\n",
    "os.makedirs(colmap_sparse_model_output_path, exist_ok=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a3cfdf6f60ee572e",
   "metadata": {},
   "outputs": [],
   "source": [
    "colmap_db_path = os.path.join(colmap_output_path, \"colmap.db\")\n",
    "assert os.path.exists(colmap_db_path) is False\n",
    "import subprocess\n",
    "\n",
    "subprocess.call([\n",
    "    \"colmap\",\n",
    "    \"database_creator\",\n",
    "    \"--database_path={}\".format(colmap_db_path),\n",
    "])\n",
    "colmap_db_path"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "507f3ae3bd5cce46",
   "metadata": {},
   "outputs": [],
   "source": [
    "import sqlite3\n",
    "\n",
    "colmap_db = sqlite3.connect(colmap_db_path)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a021f480b8d678f7",
   "metadata": {},
   "outputs": [],
   "source": [
    "try:\n",
    "    colmap_cameras = {}\n",
    "    for camera_idx in range(len(camera_width_list)):\n",
    "        colmap_camera_idx = camera_idx + 1\n",
    "        colmap_db.execute(\n",
    "            \"INSERT INTO `cameras` \"\n",
    "            \"(camera_id, model, width, height, params, prior_focal_length) \"\n",
    "            \"VALUES(?, ?, ?, ?, ?, ?)\",\n",
    "            [\n",
    "                colmap_camera_idx,\n",
    "                4,  # OPENCV\n",
    "                camera_width_list[camera_idx],\n",
    "                camera_height_list[camera_idx],\n",
    "                np.asarray([\n",
    "                    camera_focal_length_list[camera_idx],\n",
    "                    camera_focal_length_list[camera_idx],\n",
    "                    camera_width_list[camera_idx] / 2,\n",
    "                    camera_height_list[camera_idx] / 2,\n",
    "                    0.,\n",
    "                    0.,\n",
    "                    0.,\n",
    "                    0.,\n",
    "                ], dtype=np.float64).tostring(),\n",
    "                camera_has_prior_focal_length[camera_idx],\n",
    "            ],\n",
    "        )\n",
    "\n",
    "        colmap_cameras[colmap_camera_idx] = colmap.Camera(\n",
    "            id=colmap_camera_idx,\n",
    "            model=\"PINHOLE\",\n",
    "            width=camera_width_list[camera_idx],\n",
    "            height=camera_height_list[camera_idx],\n",
    "            params=np.asarray([\n",
    "                camera_focal_length_list[camera_idx],\n",
    "                camera_focal_length_list[camera_idx],\n",
    "                camera_width_list[camera_idx] / 2,\n",
    "                camera_height_list[camera_idx] / 2,\n",
    "            ], dtype=np.float64),  # [fx, fy, cx, cy]\n",
    "        )\n",
    "\n",
    "    colmap_images = {}\n",
    "    w2c = np.linalg.inv(c2w_in_enu)\n",
    "    for image_idx in range(len(valid_image_list)):\n",
    "        colmap_image_idx = image_idx + 1\n",
    "        qvec = colmap.rotmat2qvec(w2c[image_idx, :3, :3])\n",
    "        tvec = w2c[image_idx, :3, 3]\n",
    "\n",
    "        colmap_db.execute(\n",
    "            \"INSERT INTO `images` \"\n",
    "            \"VALUES (?, ?, ?)\",\n",
    "            [\n",
    "                colmap_image_idx,\n",
    "                valid_image_list[image_idx],\n",
    "                image_camera_id_list[image_idx] + 1,\n",
    "            ],\n",
    "        )\n",
    "\n",
    "        colmap_images[colmap_image_idx] = colmap.Image(\n",
    "            id=colmap_image_idx,\n",
    "            qvec=qvec,\n",
    "            tvec=tvec,\n",
    "            camera_id=image_camera_id_list[image_idx] + 1,\n",
    "            name=valid_image_list[image_idx],\n",
    "            xys=np.asarray([]),\n",
    "            point3D_ids=np.asarray([]),\n",
    "        )\n",
    "except Exception as e:\n",
    "    colmap_db.rollback()\n",
    "    raise e\n",
    "\n",
    "colmap_db.commit()\n",
    "colmap_db.close()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "45c6c6b2ad9d42e2",
   "metadata": {},
   "outputs": [],
   "source": [
    "colmap.write_cameras_binary(colmap_cameras, os.path.join(colmap_sparse_model_output_path, \"cameras.bin\"))\n",
    "colmap.write_images_binary(colmap_images, os.path.join(colmap_sparse_model_output_path, \"images.bin\"))\n",
    "colmap.write_points3D_binary({}, os.path.join(colmap_sparse_model_output_path, \"points3D.bin\"))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ecaf9e965a2757f5",
   "metadata": {},
   "outputs": [],
   "source": [
    "os.makedirs(\"{}_text\".format(colmap_sparse_model_output_path), exist_ok=True)\n",
    "subprocess.run([\n",
    "    \"colmap\",\n",
    "    \"model_converter\",\n",
    "    \"--input_path={}\".format(colmap_sparse_model_output_path),\n",
    "    \"--output_path={}_text\".format(colmap_sparse_model_output_path),\n",
    "    \"--output_type=TXT\",\n",
    "])"
   ]
  }
 ],
 "metadata": {
  "language_info": {
   "name": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
